Teo Sakel
10/01/2022
Adapted from SCHMID, R. (2006). Computational Genome Analysis
Distance of "this" vs "that" = 2
To extend the definition to strings of unequal length:
-def hamming(S0, S1):
L = len(S0), len(S1)
if L[0] == L[1]:
return sum(x != y for x, y in zip(S0, S1))
if L[0] < L[1]:
# reverse so that S0 is longer (distance is symmetric)
S0, S1 = S1, S0
padding = '-' * (L[1] - L[0])
else:
padding = '-' * (L[0] - L[1])
pref = hamming(S0, padding + S1) # pad at the begining
suff = hamming(S0, S1 + padding) # pad at the end
dist = min(pref, suff) # shortest path from S0 -> S1
return min(pref, suff) Distance of "BANANA" vs "BANA" = 2
pref vs suff
BANANA BANANA
--BANA BANA--
-.Input
S = HEAGAWGHEEAHGEGAE
Q = PAWHEAEHE
Global Alignment
HEAGAWGHEEAHGEGAE
--P-AW-HE-A--EH-E
Local Alignment
WGHEEAHGE
AW-HEAEHE
Hamming = 3 | Levensthein = 1
------------|----------------
MISSISSIPI | MISSISSIPI
-MISISSIPI | MIS-ISSIPI
| Amino Acids | Nucleotides |
|---|---|
def hamming(S0, S1, M):
# score given by matrix M
L = len(S0), len(S1)
if L[0] == L[1]:
return sum(M[x, y] for x, y in zip(S0, S1))
if L[0] < L[1]:
S0, S1 = S1, S0
padding = '-' * (L[1] - L[0])
else:
padding = '-' * (L[0] - L[1])
pref = hamming(S0, padding + S1, M)
suff = hamming(S0, S1 + padding, M)
return max(pref, suff) # max instead of minDistance of "PANAMA" vs "PAMA" based on BLOSUM62 = 5
A visual representation of all possible local alignments.
tomorrow and tomorrow and tomorrowcan i see bees in a caveTrade-off variance (noise) for bias (away from 0)
\(k\)-mers are substrings of length \(k\) contained in a sequence
MIS
ISS
SSI
SIS
ISS
SSI
SIP
IPP
PPI
MISSISSIPPI
With some extras…
dict of \(k\)-mers positionsAdapted form Harris, R.S., (2007) “Improved pairwise alignment of genomic DNA”
class BLAST90:
def __init__(self, reference, score, k=3, T=13):
self.score = score # score matrix
self.k = k # length of word
self.T = T # Threshold
self.store_reference(reference)
self.scaling_const = self.score.calc_K_lambda(self.ref)
self.index = self.build_index(self.ref)
def store_reference(self, reference):
'''Concatanate the sequences of all proteins
Sequences are separated by a string of `k` gaps (-).
That way there can be no seed spanning 2 proteins.
It also creates a list of "ranges" spanned by each
protein in the combined reference. Thus protein[i]
spans the region of the reference between
prot_ranges[i-1]:prot_ranges[i]
'''
self.proteins = list(reference.keys())
self._prot_ranges = []
self.ref = ''
sep = self.score.gap_char * self.k # protein separator
for prot, seq in reference.items():
self.ref += seq + sep
self._prot_ranges.append((len(self.ref), prot))
def build_index(self, reference):
'''Returns an index of informative words (k-mers).
The index is a dictionary that maps words to positions
in the reference that are similar to it.
Iterate over all k-mers and store the positions where
they appear in the reference. Then iterate over all
pairs of words and merge their "hits" if they are similar.
'''
hits = {}
# get positions of words
for i, word in enumerate(self.iter_words(reference)):
if self.score(word, word) < self.T:
# non-informative word, self-similarity == max(score)
continue
try:
hits[word].append(i)
except KeyError:
hits[word] = [i]
# merge hits of 2 words if they are similar
index = hits.copy()
words = list(index.keys())
for i, w1 in enumerate(words[:-1]):
for w2 in words[i+1:]:
if self.score(w1, w2) >= self.T:
index[w1] = self.merge_sorted(index[w1], hits[w2])
index[w2] = self.merge_sorted(index[w2], hits[w1])
return index
def get_protein(self, pos):
'''Given a position in the combined reference,
it returns the protein it belongs to.
`self._prot_ranges` is sorted by construction so we can
use bisect_left to find quickly the index of the protein.
'''
ix = bisect_left(self._prot_ranges, (pos, ))
return self._prot_ranges[ix][1]
def search(self, query, Xdrop, Eval_cutoff):
'''Searches for query in reference and returns all the position is maps.
Args:
- query: string to look for
- Xdrop: heuristic to stop expanding the match
- Eval_cutoff: ignore matches with greater E-value.
Return:
list with elements of the form (Eval, seed, offset, prot) where:
+ Eval = E-value
+ seed = (i, j) starting position in dot-matrix coordinates
+ offset = (bck, fwd) how far forward and backwards the alignment extends
+ prot = the protein the alignment belongs to
'''
hits = []
scanned = self.diagRanges() # to avoid double checking a hit
for i, word in enumerate(self.iter_words(query)):
for j in self.index.get(word, []):
seed = (i, j)
if seed in scanned:
continue
Eval, offset = self.extend_seed(seed, query, Xdrop)
scanned.append(seed, offset)
if Eval < Eval_cutoff:
prot = self.get_protein(j)
# heappush keeps `hits` "sorted"
heappush(hits, (Eval, seed, offset, prot))
return [heappop(hits) for i in range(len(hits))]
def extend_seed(self, seed, pattern, Xdrop):
'''Given a starting position (seed) and a pattern it extend the
alignment in both directions until the score drops below Xdrop.
'''
k = self.k
i, j = seed # i: query, j: subject
# extend forward
query = self.iter_seq(pattern , i, 1)
subject = self.iter_seq(self.ref, j, 1)
score_f, off_f = self.extend_ungapped(query, subject, Xdrop)
# extend in reverse
query = self.iter_seq(pattern , i+k, -1)
subject = self.iter_seq(self.ref, j+k, -1)
score_r, off_r = self.extend_ungapped(query, subject, Xdrop)
# final score = fwd + rev - word (word was added twice)
score = score_f + score_r - self.score(pattern[i:i+k], self.ref[j:j+k])
l = off_f + off_r - k # length of extension
m = len(pattern)
Eval = self.compute_Eval(score, m, l)
return Eval, (k - off_r, off_f)
def extend_ungapped(self, pattern, ref, Xdrop):
'''Extends the pattern along the reference until the score drops
Xdrop below the maximum.
'''
score = 0 # running score
max_score, imax = 0, 0
for i, (p, r) in enumerate(zip(pattern, ref)):
score += self.score[p, r]
if score >= max_score:
# '==' is included because new max is longer
max_score, imax = score, i
elif max_score - score > Xdrop:
break
return max_score, imax + 1
def compute_Eval(self, score, m, l):
'''Computes the E-val statistic'''
K, lam = self.scaling_const
S = (lam * score - np.log(K)) / np.log(2)
n = len(self.ref) - l + 1
m = m - l + 1
E = m * n * 2**(-S)
return E
def iter_words(self, text):
'''Iterate over all k-mers of a text'''
N = len(text)
k = self.k
for i in range(N-k+1):
yield text[i:i+k]
def iter_seq(self, text, offset=0, step=1):
'''Iterate over text forward or backward (by step) starting at offset'''
if step > 0:
text_range = range(offset, len(text), step)
else:
text_range = range(offset-1, -1, step)
for i in text_range:
yield text[i]
def iter_seeds(self, query):
'''Get all the potential seeds for a query'''
for i, word in enumerate(self.iter_words(query)):
for j in self.index.get(word, []):
yield i, j
@staticmethod
def merge_sorted(X, Y):
'''
Given 2 value ascending lists X and Y create a new sorted list Z.
Concatanating and sorting afterwards would be wasteful since X, Y have order we can exploit.
Based on actual `merge_sort`
'''
Lx, Ly = len(X), len(Y)
# check if there is something to merge
if Lx == 0:
return Y
if Ly == 0:
return X
Z = [None] * (Lx + Ly)
iz, ix, iy = 0, 0, 0
while True:
if X[ix] > Y[iy]:
Z[iz] = Y[iy]
iy += 1
else:
Z[iz] = X[ix]
ix += 1
iz += 1
if ix == Lx: # X is exhausted
Z[iz:] = Y[iy:]
break
elif iy == Ly: # Y is exhausted
Z[iz:] = X[ix:]
break
return Z
# inside BLAST
class diagRanges:
'''
Data structure to keep explored diagonals.
Seeds may be position close to each other and some extention may cover them.
Changing the staring point of the HSP does not affect the outcome.
Extention is bi-directional and only depends on max_score and Xdrop
'''
def __init__(self):
self.ranges = {}
def append(self, coord, offset):
row, col = coord
diag = col - row
interval = range(row+offset[0], row+offset[1])
try:
self.ranges[diag].append(interval)
except KeyError:
self.ranges[diag] = [interval]
def __contains__(self, coord):
row, col = coord
diag = diag = col - row
for interval in self.ranges.get(diag, []):
if row in interval or col in interval:
return True
return False
def print_blast(aln, query, ref):
Eval, seed, offset, prot = aln
i, j = seed
b, f = offset
qseq = query[i-b:i+f]
rseq = str(ref[j-b:j+f])
mis = ''.join('|' if r == q else ' ' for q, r in zip(qseq, rseq))
head = f'Protein: {prot} E-value: {Eval:.2E}'
print('\n'.join([head,
'-' * len(head),
'Query: ' + qseq,
' ' + mis,
'Sbjct: ' + rseq]))Motivation:
How many random local alignments of score \(S\) or higher do we expect to recover for a query of length \(m\)?
Alignment are computed in 2 main steps:
def random_sequencer(m, alphabet, p=None):
if p is not None:
p = np.array(p)
assert np.all(p >= 0)
p = p / sum(p)
rng = np.random.default_rng()
while True:
yield ''.join(rng.choice(alphabet, m, replace=True, p=p))
amino_freq = get_amino_freq() # dictionary {A: freq_A, ...}
aminos, amino_p = zip(*amino_freq.items())
sequence_generator = random_sequencer(10, aminos, amino_p)
for i in range(10):
s = next(sequence_generator)
print(s)SDGAYTVDRV
VIRQTPLRRL
RKAEIELRVI
AQEGSYNCTT
FKLSIAVRNG
VRASPTRRRH
DRDGGMVEYN
LMFWKGFPRT
VATARSLSRQ
GKGMQEPSFD
Bigger matrices result in more seeds:
\[E \sim mn\]
The score of a random ungapped alignment equivalent to a negatively biased random walk.
The probability of observing a score drops exponentially:
\[E \sim e^{-\lambda s}\]
Building on these 2 intuition we can sketch a theory:
\[P(N(S) \ge 1) = 1 - e^E\]
BLAST uses a normalized version of the score to calculate the E-value:
\[ \begin{align} S' &= \frac{\lambda S - \log K}{\log 2}\\ E &= mn 2^{-S'} \end{align} \]
From Wikipedia
Dynamic programming is a method for solving a complex problem by breaking it down into a collection of simpler subproblems, solving each of those subproblems just once, and storing their solutions. The next time the same subproblem occurs, instead of recomputing its solution, one simply looks up the previously computed solution, thereby saving computation time at the expense of a (hopefully) modest expenditure in storage space.
We want to expand this approach now to address the optimal alignment problem. So we have to
A natural metric of size is the length of the (finally) aligned sequences (bigger than either)
\[ \begin{aligned} \hat{X} &= \hat{x}_1\hat{x}_2 \dots \hat{x}_k & \hat{x} &\in \{x_1,\dots,x_m, \_\}\\ \hat{Y} &= \hat{y}_1\hat{y}_2 \dots \hat{y}_k & \hat{y} &\in \{y_1,\dots,y_n, \_\} \\ \end{aligned} \]
Focus on the last elements \((\hat{x}_k, \hat{y}_k)\). There are 3 possibilities:
\[ \begin{aligned} \hat{x}_k &= x_m & \hat{y}_k &= y_n \\ \hat{x}_k &= x_m & \hat{y}_k &= \_ \\ \hat{x}_k &= \_ & \hat{y}_k &= y_n \end{aligned} \]
Each case correspond to a new smaller pair \((X', Y')\):
\[ \begin{aligned} X' &= x_1 \dots x_{m-1} & Y' &= y_1 \dots y_{n-1} \\ X' &= x_1 \dots x_{m-1} & Y' &= Y \\ X' &= X & Y' &= y_1 \dots y_{n-1} \end{aligned} \]
If someone the solutions to \(S_{X',Y'}, S_{X',Y}, S_{X,Y'}\) were known then we can construct the final solution:
\[ S_{X,Y} = \max \begin{cases} S_{X',Y'} &+& G(x_m, y_n) \\ S_{X',Y} &+& G(x_m, \_) \\ S_{X,Y'} &+& G(\_,y_n) \end{cases} \]
The optimal alignment of a string \(X\) with an empty string ’’ is:
\[ \begin{aligned} x_1 x_2 &\dots x_n \\ \mathtt{\_\_} &\dots \mathtt{\_\_} \end{aligned} \]
Finds global alignment
def NeedlemanWunsch(x, y, M):
# Initialize Variables
m, n = len(x) + 1, len(y) + 1
S = np.zeros((m, n), dtype=int) # cost matrix
P = np.zeros((m, n), dtype=int) # predecesor matrix
# Initialize 1st column/row
for i in range(1, m):
S[i, 0] = S[i-1, 0] + M(x[i-1], '_')
P[1:, 0] = 2
for j in range(1, n):
S[0, j] = S[0, j-1] + M('_', y[j-1])
P[0, 1:] = 3
# Main Recursion
for i in range(1, m):
for j in range(1, n):
scores = (S[i-1, j-1] + M(x[i-1], y[j-1]), # match
S[i-1, j ] + M(x[i-1], '_' ), # insertion
S[ i , j-1] + M( '_' , y[j-1])) # deletion
imax = np.argmax(scores)
S[i, j] = scores[imax]
P[i, j] = imax + 1
return S, PIgnores gap at the beginning and end of alignment ~ grep
def NeedlemanWunsch(x, y, M, algorithm="global"):
# Initialize Variables
m, n = len(x) + 1, len(y) + 1
S = np.zeros((m, n), dtype=int) # cost matrix
P = np.zeros((m, n), dtype=int) # predecesor matrix
# Initialize 1st column/row
if algorithm == 'global':
for i in range(1, m):
S[i, 0] = S[i-1, 0] + M(x[i-1], '_')
P[1:, 0] = 2
for j in range(1, n):
S[0, j] = S[0, j-1] + M('_', y[j-1])
P[0, 1:] = 3
elif algorithm == 'semi':
pass
else:
raise ValueError(f'Unknown algorithm specified: {algorithm}')
# Main Recursion
for i in range(1, m):
for j in range(1, n):
scores = (S[i-1, j-1] + M(x[i-1], y[j-1]), # match
S[i-1, j ] + M(x[i-1], '_' ), # insertion
S[ i , j-1] + M( '_' , y[j-1])) # deletion
imax = np.argmax(scores)
S[i, j] = scores[imax]
P[i, j] = imax + 1
return S, P
def SmithWaterman(x, y, M):
# Initialize Variables
m, n = len(x) + 1, len(y) + 1
S = np.zeros((m, n), dtype=int) # cost matrix
P = np.zeros((m, n), dtype=int) # predecesor matrix
# Main Recursion
for i in range(1, m):
for j in range(1, n):
scores = (0, # restart
S[i-1, j-1] + M(x[i-1], y[j-1]), # match
S[i-1, j ] + M(x[i-1], '_' ), # insertion
S[ i , j-1] + M( '_' , y[j-1])) # deletion
imax = np.argmax(scores)
S[i, j] = scores[imax]
P[i, j] = imax
return S, PHEAGAWGHE-E
|| || |
--P-AW-HEAE
AWGHE-E
|| || |
AW-HEAE
\[ G(\text{gap}) = C_{\text{open}} + C (l_{\text{gap}}) \]
\[ G(\text{gap}) = C_{\text{open}} + C \cdot l_{\text{gap}}\]
def SmithWaterman(x, y, M, gapOpen, gapExtend):
m, n = len(x) + 1, len(y) + 1
S = np.zeros((m, n), dtype=int) # score matrix
D = np.zeros((m, n), dtype=int) # deletion matrix
I = np.zeros((m, n), dtype=int) # insertion matrix
P = np.zeros((m, n), dtype=int) # predecesor matrix
for i in range(1, m):
for j in range(1, n):
D[i, j] = max(S[i, j-1] - gapOpen, # open a deletion gap
D[i, j-1] - gapExtend) # extend deletion gap
I[i, j] = max(S[i-1, j] - gapOpen, # open a insertion gap
D[i-1, j] - gapExtend) # extend insertion gap
scores = (0,
S[i-1, j-1] + M[x[i-1], y[j-1]],
I[i, j],
D[i, j])
imax = np.argmax(scores)
S[i, j] = scores[imax]
P[i, j] = imax
return S, PAWGHE-E
|| || |
AW-HEAE
AWGHEE
|| |
AWHEAE